1 package org.apache.solr.cloud;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 import com.google.common.cache.Cache;
21 import com.google.common.cache.CacheBuilder;
22
23 import org.apache.solr.client.solrj.impl.HttpSolrClient;
24 import org.apache.solr.client.solrj.request.CoreAdminRequest.Create;
25 import org.apache.solr.common.SolrException;
26 import org.apache.solr.common.cloud.ClusterState;
27 import org.apache.solr.common.cloud.ClusterStateUtil;
28 import org.apache.solr.common.cloud.DocCollection;
29 import org.apache.solr.common.cloud.Replica;
30 import org.apache.solr.common.cloud.Slice;
31 import org.apache.solr.common.cloud.ZkStateReader;
32 import org.apache.solr.core.CloudConfig;
33 import org.apache.solr.update.UpdateShardHandler;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36 import org.slf4j.MDC;
37
38 import java.io.Closeable;
39 import java.lang.invoke.MethodHandles;
40 import java.util.ArrayList;
41 import java.util.Collection;
42 import java.util.Collections;
43 import java.util.Comparator;
44 import java.util.HashMap;
45 import java.util.HashSet;
46 import java.util.Map;
47 import java.util.Set;
48 import java.util.TreeMap;
49 import java.util.concurrent.Callable;
50 import java.util.concurrent.ExecutorService;
51 import java.util.concurrent.TimeUnit;
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81 public class OverseerAutoReplicaFailoverThread implements Runnable, Closeable {
82
83 private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
84
85 private Integer lastClusterStateVersion;
86
87 private final ExecutorService updateExecutor;
88 private volatile boolean isClosed;
89 private ZkStateReader zkStateReader;
90 private final Cache<String,Long> baseUrlForBadNodes;
91 private Set<String> liveNodes = Collections.EMPTY_SET;
92
93 private final int workLoopDelay;
94 private final int waitAfterExpiration;
95
96 public OverseerAutoReplicaFailoverThread(CloudConfig config, ZkStateReader zkStateReader,
97 UpdateShardHandler updateShardHandler) {
98 this.zkStateReader = zkStateReader;
99
100 this.workLoopDelay = config.getAutoReplicaFailoverWorkLoopDelay();
101 this.waitAfterExpiration = config.getAutoReplicaFailoverWaitAfterExpiration();
102 int badNodeExpiration = config.getAutoReplicaFailoverBadNodeExpiration();
103
104 log.info(
105 "Starting "
106 + this.getClass().getSimpleName()
107 + " autoReplicaFailoverWorkLoopDelay={} autoReplicaFailoverWaitAfterExpiration={} autoReplicaFailoverBadNodeExpiration={}",
108 workLoopDelay, waitAfterExpiration, badNodeExpiration);
109
110 baseUrlForBadNodes = CacheBuilder.newBuilder()
111 .concurrencyLevel(1).expireAfterWrite(badNodeExpiration, TimeUnit.MILLISECONDS).build();
112
113
114
115 updateExecutor = updateShardHandler.getUpdateExecutor();
116
117
118
119
120 }
121
122 @Override
123 public void run() {
124
125 while (!this.isClosed) {
126
127 log.debug("do " + this.getClass().getSimpleName() + " work loop");
128
129
130
131 try {
132 doWork();
133 } catch (Exception e) {
134 SolrException.log(log, this.getClass().getSimpleName()
135 + " had an error in its thread work loop.", e);
136 }
137
138 if (!this.isClosed) {
139 try {
140 Thread.sleep(workLoopDelay);
141 } catch (InterruptedException e) {
142 Thread.currentThread().interrupt();
143 }
144 }
145 }
146 }
147
148 private void doWork() {
149
150
151 ClusterState clusterState = zkStateReader.getClusterState();
152
153 String autoAddReplicas = (String) zkStateReader.getClusterProps().get(ZkStateReader.AUTO_ADD_REPLICAS);
154 if (autoAddReplicas != null && autoAddReplicas.equals("false")) {
155 return;
156 }
157 if (clusterState != null) {
158 if (clusterState.getZkClusterStateVersion() != null &&
159 clusterState.getZkClusterStateVersion().equals(lastClusterStateVersion) && baseUrlForBadNodes.size() == 0 &&
160 liveNodes.equals(clusterState.getLiveNodes())) {
161
162 return;
163 }
164
165 liveNodes = clusterState.getLiveNodes();
166 lastClusterStateVersion = clusterState.getZkClusterStateVersion();
167 Set<String> collections = clusterState.getCollections();
168 for (final String collection : collections) {
169 log.debug("look at collection={}", collection);
170 DocCollection docCollection = clusterState.getCollection(collection);
171 if (!docCollection.getAutoAddReplicas()) {
172 log.debug("Collection {} is not setup to use autoAddReplicas, skipping..", docCollection.getName());
173 continue;
174 }
175 if (docCollection.getReplicationFactor() == null) {
176 log.debug("Skipping collection because it has no defined replicationFactor, name={}", docCollection.getName());
177 continue;
178 }
179 log.debug("Found collection, name={} replicationFactor={}", collection, docCollection.getReplicationFactor());
180
181 Collection<Slice> slices = docCollection.getSlices();
182 for (Slice slice : slices) {
183 if (slice.getState() == Slice.State.ACTIVE) {
184
185 final Collection<DownReplica> downReplicas = new ArrayList<DownReplica>();
186
187 int goodReplicas = findDownReplicasInSlice(clusterState, docCollection, slice, downReplicas);
188
189 log.debug("collection={} replicationFactor={} goodReplicaCount={}", docCollection.getName(), docCollection.getReplicationFactor(), goodReplicas);
190
191 if (downReplicas.size() > 0 && goodReplicas < docCollection.getReplicationFactor()) {
192
193 processBadReplicas(collection, downReplicas);
194 } else if (goodReplicas > docCollection.getReplicationFactor()) {
195 log.debug("There are too many replicas");
196 }
197 }
198 }
199 }
200
201 }
202 }
203
204 private void processBadReplicas(final String collection, final Collection<DownReplica> badReplicas) {
205 for (DownReplica badReplica : badReplicas) {
206 log.debug("process down replica={} from collection={}", badReplica.replica.getName(), collection);
207 String baseUrl = badReplica.replica.getStr(ZkStateReader.BASE_URL_PROP);
208 Long wentBadAtNS = baseUrlForBadNodes.getIfPresent(baseUrl);
209 if (wentBadAtNS == null) {
210 log.warn("Replica {} may need to failover.",
211 badReplica.replica.getName());
212 baseUrlForBadNodes.put(baseUrl, System.nanoTime());
213
214 } else {
215
216 long elasped = System.nanoTime() - wentBadAtNS;
217 if (elasped < TimeUnit.NANOSECONDS.convert(waitAfterExpiration, TimeUnit.MILLISECONDS)) {
218
219 log.debug("Looks troublesome...continue. Elapsed={}", elasped + "ns");
220 } else {
221 log.debug("We need to add a replica. Elapsed={}", elasped + "ns");
222
223 if (addReplica(collection, badReplica)) {
224 baseUrlForBadNodes.invalidate(baseUrl);
225 }
226 }
227 }
228 }
229 }
230
231 private boolean addReplica(final String collection, DownReplica badReplica) {
232
233
234 final String createUrl = getBestCreateUrl(zkStateReader, badReplica);
235 if (createUrl == null) {
236 log.warn("Could not find a node to create new replica on.");
237 return false;
238 }
239
240
241
242
243 final String dataDir = badReplica.replica.getStr("dataDir");
244 final String ulogDir = badReplica.replica.getStr("ulogDir");
245 final String coreNodeName = badReplica.replica.getName();
246 if (dataDir != null) {
247
248 final String coreName = badReplica.replica.getStr(ZkStateReader.CORE_NAME_PROP);
249 log.debug("submit call to {}", createUrl);
250 MDC.put("OverseerAutoReplicaFailoverThread.createUrl", createUrl);
251 try {
252 updateExecutor.submit(new Callable<Boolean>() {
253
254 @Override
255 public Boolean call() {
256 return createSolrCore(collection, createUrl, dataDir, ulogDir, coreNodeName, coreName);
257 }
258 });
259 } finally {
260 MDC.remove("OverseerAutoReplicaFailoverThread.createUrl");
261 }
262
263
264 boolean success = ClusterStateUtil.waitToSeeLiveReplica(zkStateReader, collection, coreNodeName, createUrl, 30000);
265 if (!success) {
266 log.error("Creating new replica appears to have failed, timed out waiting to see created SolrCore register in the clusterstate.");
267 return false;
268 }
269 return true;
270 }
271
272 log.warn("Could not find dataDir or ulogDir in cluster state.");
273
274 return false;
275 }
276
277 private static int findDownReplicasInSlice(ClusterState clusterState, DocCollection collection, Slice slice, final Collection<DownReplica> badReplicas) {
278 int goodReplicas = 0;
279 Collection<Replica> replicas = slice.getReplicas();
280 if (replicas != null) {
281 for (Replica replica : replicas) {
282
283 boolean live = clusterState.liveNodesContain(replica.getNodeName());
284 final Replica.State state = replica.getState();
285
286 final boolean okayState = state == Replica.State.DOWN
287 || state == Replica.State.RECOVERING
288 || state == Replica.State.ACTIVE;
289
290 log.debug("Process replica name={} live={} state={}", replica.getName(), live, state.toString());
291
292 if (live && okayState) {
293 goodReplicas++;
294 } else {
295 DownReplica badReplica = new DownReplica();
296 badReplica.replica = replica;
297 badReplica.slice = slice;
298 badReplica.collection = collection;
299 badReplicas.add(badReplica);
300 }
301 }
302 }
303 log.debug("bad replicas for slice {}", badReplicas);
304 return goodReplicas;
305 }
306
307
308
309
310
311
312 static String getBestCreateUrl(ZkStateReader zkStateReader, DownReplica badReplica) {
313 assert badReplica != null;
314 assert badReplica.collection != null;
315 assert badReplica.slice != null;
316 log.debug("getBestCreateUrl for " + badReplica.replica);
317 Map<String,Counts> counts = new HashMap<String, Counts>();
318 Set<String> unsuitableHosts = new HashSet<String>();
319
320 Set<String> liveNodes = new HashSet<>(zkStateReader.getClusterState().getLiveNodes());
321
322 ClusterState clusterState = zkStateReader.getClusterState();
323 if (clusterState != null) {
324 Set<String> collections = clusterState.getCollections();
325 for (String collection : collections) {
326 log.debug("look at collection {} as possible create candidate", collection);
327 DocCollection docCollection = clusterState.getCollection(collection);
328
329 Collection<Slice> slices = docCollection.getSlices();
330 for (Slice slice : slices) {
331
332 if (slice.getState() == Slice.State.ACTIVE) {
333 log.debug("look at slice {} for collection {} as possible create candidate", slice.getName(), collection);
334 Collection<Replica> replicas = slice.getReplicas();
335
336 for (Replica replica : replicas) {
337 liveNodes.remove(replica.getNodeName());
338 String baseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
339 if (baseUrl.equals(
340 badReplica.replica.getStr(ZkStateReader.BASE_URL_PROP))) {
341 continue;
342 }
343
344 log.debug("collection={} nodename={} livenodes={}", collection, replica.getNodeName(), clusterState.getLiveNodes());
345 boolean live = clusterState.liveNodesContain(replica.getNodeName());
346 log.debug("collection={} look at replica {} as possible create candidate, live={}", collection, replica.getName(), live);
347 if (live) {
348 Counts cnt = counts.get(baseUrl);
349 if (cnt == null) {
350 cnt = new Counts();
351 }
352 if (badReplica.collection.getName().equals(collection)) {
353 cnt.negRankingWeight += 3;
354 cnt.collectionShardsOnNode += 1;
355 } else {
356 cnt.negRankingWeight += 1;
357 }
358 if (badReplica.collection.getName().equals(collection) && badReplica.slice.getName().equals(slice.getName())) {
359 cnt.ourReplicas++;
360 }
361
362
363
364 Integer maxShardsPerNode = badReplica.collection.getMaxShardsPerNode();
365 if (maxShardsPerNode == null) {
366 log.warn("maxShardsPerNode is not defined for collection, name=" + badReplica.collection.getName());
367 maxShardsPerNode = Integer.MAX_VALUE;
368 }
369 log.debug("collection={} node={} max shards per node={} potential hosts={}", collection, baseUrl, maxShardsPerNode, cnt);
370
371 Collection<Replica> badSliceReplicas = null;
372 DocCollection c = clusterState.getCollection(badReplica.collection.getName());
373 if (c != null) {
374 Slice s = c.getSlice(badReplica.slice.getName());
375 if (s != null) {
376 badSliceReplicas = s.getReplicas();
377 }
378 }
379 boolean alreadyExistsOnNode = replicaAlreadyExistsOnNode(zkStateReader.getClusterState(), badSliceReplicas, badReplica, baseUrl);
380 if (unsuitableHosts.contains(baseUrl) || alreadyExistsOnNode || cnt.collectionShardsOnNode >= maxShardsPerNode) {
381 counts.remove(baseUrl);
382 unsuitableHosts.add(baseUrl);
383 log.debug("not a candidate node, collection={} node={} max shards per node={} good replicas={}", collection, baseUrl, maxShardsPerNode, cnt);
384 } else {
385 counts.put(baseUrl, cnt);
386 log.debug("is a candidate node, collection={} node={} max shards per node={} good replicas={}", collection, baseUrl, maxShardsPerNode, cnt);
387 }
388 }
389 }
390 }
391 }
392 }
393 }
394
395 for (String node : liveNodes) {
396 counts.put(zkStateReader.getBaseUrlForNodeName(node), new Counts(0, 0));
397 }
398
399 if (counts.size() == 0) {
400 log.debug("no suitable hosts found for getBestCreateUrl for collection={}", badReplica.collection.getName());
401 return null;
402 }
403
404 ValueComparator vc = new ValueComparator(counts);
405 Map<String,Counts> sortedCounts = new TreeMap<String, Counts>(vc);
406 sortedCounts.putAll(counts);
407
408 log.debug("empty nodes={} for collection={}", liveNodes, badReplica.collection.getName());
409 log.debug("sorted hosts={} for collection={}", sortedCounts, badReplica.collection.getName());
410 log.debug("unsuitable hosts={} for collection={}", unsuitableHosts, badReplica.collection.getName());
411
412 return sortedCounts.keySet().iterator().next();
413 }
414
415 private static boolean replicaAlreadyExistsOnNode(ClusterState clusterState, Collection<Replica> replicas, DownReplica badReplica, String baseUrl) {
416 if (replicas != null) {
417 log.debug("collection={} check if replica already exists on node using replicas {}", badReplica.collection.getName(), getNames(replicas));
418 for (Replica replica : replicas) {
419 final Replica.State state = replica.getState();
420 if (!replica.getName().equals(badReplica.replica.getName()) && replica.getStr(ZkStateReader.BASE_URL_PROP).equals(baseUrl)
421 && clusterState.liveNodesContain(replica.getNodeName())
422 && (state == Replica.State.ACTIVE || state == Replica.State.DOWN || state == Replica.State.RECOVERING)) {
423 log.debug("collection={} replica already exists on node, bad replica={}, existing replica={}, node name={}", badReplica.collection.getName(), badReplica.replica.getName(), replica.getName(), replica.getNodeName());
424 return true;
425 }
426 }
427 }
428 log.debug("collection={} replica does not yet exist on node: {}", badReplica.collection.getName(), baseUrl);
429 return false;
430 }
431
432 private static Object getNames(Collection<Replica> replicas) {
433 Set<String> names = new HashSet<>(replicas.size());
434 for (Replica replica : replicas) {
435 names.add(replica.getName());
436 }
437 return names;
438 }
439
440 private boolean createSolrCore(final String collection,
441 final String createUrl, final String dataDir, final String ulogDir,
442 final String coreNodeName, final String coreName) {
443
444 try (HttpSolrClient client = new HttpSolrClient(createUrl)) {
445 log.debug("create url={}", createUrl);
446 client.setConnectionTimeout(30000);
447 client.setSoTimeout(60000);
448 Create createCmd = new Create();
449 createCmd.setCollection(collection);
450 createCmd.setCoreNodeName(coreNodeName);
451
452
453 createCmd.setCoreName(coreName);
454 createCmd.setDataDir(dataDir);
455 createCmd.setUlogDir(ulogDir);
456 client.request(createCmd);
457 } catch (Exception e) {
458 SolrException.log(log, "Exception trying to create new replica on " + createUrl, e);
459 return false;
460 }
461 return true;
462 }
463
464 private static class ValueComparator implements Comparator<String> {
465 Map<String,Counts> map;
466
467 public ValueComparator(Map<String,Counts> map) {
468 this.map = map;
469 }
470
471 public int compare(String a, String b) {
472 if (map.get(a).negRankingWeight >= map.get(b).negRankingWeight) {
473 return 1;
474 } else {
475 return -1;
476 }
477 }
478 }
479
480 @Override
481 public void close() {
482 isClosed = true;
483 }
484
485 public boolean isClosed() {
486 return isClosed;
487 }
488
489
490 private static class Counts {
491 int collectionShardsOnNode = 0;
492 int negRankingWeight = 0;
493 int ourReplicas = 0;
494
495 private Counts() {
496
497 }
498
499 private Counts(int totalReplicas, int ourReplicas) {
500 this.negRankingWeight = totalReplicas;
501 this.ourReplicas = ourReplicas;
502 }
503
504 @Override
505 public String toString() {
506 return "Counts [negRankingWeight=" + negRankingWeight + ", sameSliceCount="
507 + ourReplicas + ", collectionShardsOnNode=" + collectionShardsOnNode + "]";
508 }
509 }
510
511 static class DownReplica {
512 Replica replica;
513 Slice slice;
514 DocCollection collection;
515
516 @Override
517 public String toString() {
518 return "DownReplica [replica=" + replica.getName() + ", slice="
519 + slice.getName() + ", collection=" + collection.getName() + "]";
520 }
521 }
522
523 }